package mr

import (
	"encoding/json"
	"fmt"
	"hash/fnv"
	"io/ioutil"
	"log"
	"net/rpc"
	"os"
	"sort"
	"strconv"
	"time"
)

// for sorting by key.
type ByKey []KeyValue

// for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

//
// Map functions return a slice of KeyValue.
//
type KeyValue struct {
	Key   string
	Value string
}

//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
//
func ihash(key string) int {
	h := fnv.New32a()
	h.Write([]byte(key))
	return int(h.Sum32() & 0x7fffffff)
}

//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	registReply := RegistResp{}
	if ok := call("Master.RegisterWorker", &RegistArgs{}, &registReply); !ok {
		return
	}
	workerID := registReply.WorkerID

	// Your worker implementation here.
	// send call to request task
	tryAccessTimes := 0
	for {
		args := RequestArgs{-1, -1, workerID}
		reply := ResponseType{}

		if ok := call("Master.AcceptWorker", &args, &reply); !ok {
			fmt.Println("worker > request failed, sleep...")
			time.Sleep(100 * time.Millisecond)
			tryAccessTimes++
			if tryAccessTimes > 5 {
				fmt.Println("worker > cannot access master. Quit")
				return
			}
			continue
		}

		tryAccessTimes = 0
		if reply.NReduce == -1 {
			fmt.Println("worker > exit")
			return
		}

		// fmt.Println("apply job success")
		switch reply.JobType {
		case MAP: // map job
			mapJob(&reply, mapf)
		case REDUCE:
			reduceJob(&reply, reducef)
		}
	}
}

func mapJob(reply *ResponseType,
	mapf func(string, string) []KeyValue) {
	// 打开原始文件
	file, err := os.Open(reply.BucketName)
	if err != nil {
		log.Fatalf("cannot open %v", reply.BucketName)
	}
	content, err := ioutil.ReadAll(file)
	if err != nil {
		log.Fatalf("cannot read %v", reply.BucketName)
	}
	file.Close()

	// 调用用户的map函数
	kva := mapf(reply.BucketName, string(content))

	// 对于每个key值，划分为nReduce个组
	var groups = make([][]KeyValue, reply.NReduce)
	for _, kv := range kva {
		gid := ihash(kv.Key) % reply.NReduce
		groups[gid] = append(groups[gid], kv)
	}

	// 将中间文件写入disk,注意写入的是NReduce个不同文件
	for index, kvs := range groups {
		// 创建或打开intermediate文件
		filename := "mr-" + strconv.Itoa(reply.TaskNum) + "-" + strconv.Itoa(index)
		// file, _ = os.OpenFile(filename, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0644)
		file, err := ioutil.TempFile(".", "mr-")
		if err != nil {
			log.Fatalln("cannot create temporary file")
		}

		// content := encode(kvs)

		// n, err := file.Write(content)
		// if err != nil || n != len(content) {
		// 	log.Fatalln("cannot write", file.Name())
		// }
		// err = file.Close()
		// if err != nil {
		// 	log.Fatalln("cannot close", file.Name())
		// }
		// err = os.Rename(file.Name(), filename)
		// if err != nil {
		// 	log.Fatalln("cannot rename to", filename)
		// }
		// log.Printf("%s created.", filename)
		enc := json.NewEncoder(file)
		for _, kv := range kvs {
			err := enc.Encode(&kv)
			if err != nil {
				os.Remove(file.Name())
				// call for failure
				log.Fatal("map write file error")
			}
		}
		os.Rename(file.Name(), filename)
		if err != nil {
			log.Fatalln("cannot rename to", filename)
		}
		log.Printf("%s created.", filename)
	}

	rpArgs := RequestArgs{}
	rpArgs.JobType = MAP
	rpArgs.TaskNum = reply.TaskNum

	rpReply := ResponseType{}

	call("Master.WorkerFinished", &rpArgs, &rpReply) // TODO: 考虑失败的worker

}

func reduceJob(reply *ResponseType,
	reducef func(string, []string) string) {
	// 读取所有属于taskNum的mr-X-taskNum.txt文件 到 intermediate
	var intermediate []KeyValue
	for i := 0; ; i++ { // i对应map任务号
		filename := "mr-" + strconv.Itoa(i) + "-" + strconv.Itoa(reply.TaskNum)
		file, err := os.Open(filename)
		if err != nil {
			break
		}
		dec := json.NewDecoder(file)
		for {
			var kv KeyValue
			if err := dec.Decode(&kv); err != nil {
				break
			}
			intermediate = append(intermediate, kv)
		}
		file.Close()
	}

	// sort
	sort.Sort(ByKey(intermediate))

	// 注意！使用临时文件，防止残缺文件被写入！
	oname := fmt.Sprintf("mr-out-%v", reply.TaskNum)
	ofile, err := ioutil.TempFile(".", "mr-")
	// oname := "mr-out-" + strconv.Itoa(reply.TaskNum)
	// ofile, err := os.Create(oname)
	if err != nil {
		log.Fatalln("cannot create temporary file")
	}

	fmt.Printf("worker > inter len : %v\n", len(intermediate))
	// call the user define reduce function
	i := 0
	for i < len(intermediate) {
		j := i + 1
		for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key { // the same key has been sorted to be together
			j++
		}
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, intermediate[k].Value)
		}
		output := reducef(intermediate[i].Key, values)

		// this is the correct format for each line of Reduce output.
		fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)

		i = j
	}
	err = ofile.Close()
	if err != nil {
		log.Fatalln("cannot close", oname)
	}
	err = os.Rename(ofile.Name(), oname)
	if err != nil {
		log.Fatalln("cannot rename to", oname)
	}

	rpArgs := RequestArgs{}
	rpArgs.JobType = REDUCE
	rpArgs.TaskNum = reply.TaskNum

	rpReply := ResponseType{}

	call("Master.WorkerFinished", &rpArgs, &rpReply) // 目前，未考虑失败的worker
}

//
// example function to show how to make an RPC call to the master.
//
// the RPC argument and reply types are defined in rpc.go.
//
func CallExample() {

	// declare an argument structure.
	args := ExampleArgs{}

	// fill in the argument(s).
	args.X = 99

	// declare a reply structure.
	reply := ExampleReply{}

	// send the RPC request, wait for the reply.
	call("Master.Example", &args, &reply)

	// reply.Y should be 100.
	fmt.Printf("reply.Y %v\n", reply.Y)
}

//
// send an RPC request to the master, wait for the response.
// usually returns true.
// returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) bool {
	// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
	sockname := masterSock()
	c, err := rpc.DialHTTP("unix", sockname)
	if err != nil {
		log.Fatal("dialing:", err)
	}
	defer c.Close()

	err = c.Call(rpcname, args, reply)
	if err == nil {
		return true
	}

	fmt.Println(err)
	return false
}
